coordinated compaction improvements#29486
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes a bug where group offset updates in coordinated compaction could be lost during leadership transfers. The core issue was that the compaction coordinator did not properly handle leadership changes, potentially losing MTRO/MXRO updates that were in flight when leadership transferred.
Changes:
- Modified leadership change handling to track the leadership term and force replication of group offsets when leadership changes
- Updated test to handle transient leadership transfer failures and retry replication/leadership operations
- Removed the
flaky = Trueflag from the test build configuration after fixing the underlying race condition
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/raft/compaction_coordinator.h | Added term tracking and force update flag for leadership changes |
| src/v/raft/compaction_coordinator.cc | Implemented term-based leadership tracking and forced offset distribution on leadership change |
| src/v/raft/consensus.cc | Updated leadership notification to pass term ID to compaction coordinator |
| src/v/raft/tests/coordinated_compaction_test.cc | Enhanced test robustness with retry logic for leadership transfers and replication |
| src/v/raft/tests/BUILD | Removed flaky test flag |
src/v/raft/compaction_coordinator.cc
Outdated
| // we will retry later anyway. Retries may be cancelled | ||
| // a) on leadership change, where the flag will be reset back to true; | ||
| // or | ||
| // b) on a further call to `on_group_offsets_update` due to |
There was a problem hiding this comment.
Corrected spelling of 'on_group_offsets_update' (which has been removed) to 'update_group_offsets' in the comment.
| // b) on a further call to `on_group_offsets_update` due to | |
| // b) on a further call to `update_group_offsets` due to |
| "compaction coordinator planning to distribute group offsets in {}", | ||
| group_offsets_send_delay); |
There was a problem hiding this comment.
The log message is missing a unit for the delay. Consider changing to 'planning to distribute group offsets in {} seconds' or using appropriate formatting for the duration.
| "compaction coordinator planning to distribute group offsets in {}", | |
| group_offsets_send_delay); | |
| "compaction coordinator planning to distribute group offsets in {} ms", | |
| std::chrono::duration_cast<std::chrono::milliseconds>( | |
| group_offsets_send_delay) | |
| .count()); |
There was a problem hiding this comment.
fmt prints (or rather should print) durations in human-readable form
CI test resultstest results on build#79952
test results on build#79978
test results on build#80040 |
| void compaction_coordinator::on_group_offsets_update() { | ||
| if (_is_leader) { | ||
| if ( | ||
| _leader_term_id && (mtro_updated || mxro_updated || _need_force_update)) { |
There was a problem hiding this comment.
How could this get called with the right branch of the 'and' false
can this be called with mtro_updated false, mxro updated false, and need_force_update false?
There was a problem hiding this comment.
E.g. in the following situation. We are the leader and we have been the one in the current term for a while. The timer triggered collect_all_replica_offsets, it called update_local_replica_offsets. Local offsets have increased, so it called recalculate_group_offsets. However, the local offsets were not the lowest, as some replica is holding behind the compaction. new_mtro and new_mxro remained the same. update_group_offsets is called with the same values as at some point before. mtro_updated and mxro_updated are both false. _need_force_update is false as well, as it is not the first update in the current term.
| auto last_data_offset = r.value().last_offset; | ||
| last_data_offsets.push_back(last_data_offset); | ||
| model::offset last_data_offset; | ||
| while (true) { |
There was a problem hiding this comment.
nit: while true without a loop-global timeout makes me nervous
There was a problem hiding this comment.
I'll change it into a for (int attempt = 0; attempt < 5; ++attempt)
| node_id, | ||
| committed_offset); | ||
| return committed_offset == last_data_offset; | ||
| return committed_offset >= last_data_offset; |
There was a problem hiding this comment.
What is the reason behind this one?
There was a problem hiding this comment.
Due to a leadership transfer a configuration batch may have been replicated after the data.
joe-redpanda
left a comment
There was a problem hiding this comment.
Some questions
some nits
globally looks good
70c47a3 to
a88d11a
Compare
| ASSERT_FALSE_CORO(r.has_error()); | ||
| auto last_data_offset = r.value().last_offset; | ||
| last_data_offsets.push_back(last_data_offset); | ||
| model::offset last_data_offset; |
There was a problem hiding this comment.
The variable last_data_offset is uninitialized and will remain uninitialized if all 5 retry attempts fail (line 55-70). After the loop completes, it's used unconditionally at lines 71-75 and 99, which will result in undefined behavior. Initialize it to a sentinel value or add an assertion after the retry loop to ensure it was set.
| _need_force_update = true; | ||
| } | ||
| _leader_term_id = {new_term}; | ||
| arm_timer_if_needed(true); |
There was a problem hiding this comment.
is there a risk of arming the timer twice if the same replica is elected leader back to back?
An update to group offsets may be lost due to leadership transfers. We would previously tolerate it: partitions are typically written into and compacted, so coordination offsets often move and a new update will get through. Change it so that such updates are never lost: - Store current leadership term. - In each term leader to distribute group offsets to followers regardless whether they changed. - When replica offsets are updated with group updates do not recalculate group updates. This is to avoid pointless updates.
a88d11a to
9367a09
Compare
| void compaction_coordinator::recalculate_group_offsets() { | ||
| vassert( | ||
| _is_leader, "only leader can recalculate max tombstone remove offset"); | ||
| vassert(_leader_term_id, "only leader can recalculate group offsets"); |
There was a problem hiding this comment.
Using _leader_term_id (an std::optional<model::term_id>) directly in a boolean context is incorrect. The assertion should check _leader_term_id.has_value() to properly verify whether there is a valid leader term ID.
| vassert(_leader_term_id, "only leader can recalculate group offsets"); | |
| vassert( | |
| _leader_term_id.has_value(), | |
| "only leader can recalculate group offsets"); |
Make resilient to leader transfers which may happen under load: - increase timeouts; - check for current leader; - retry on errc::not_leader; - allow for configuration batches offsets. Also add a test for no lost updates after a leadership transfer.
9367a09 to
00b2afb
Compare
|
/backport v25.3.x |
|
Failed to create a backport PR to v25.3.x branch. I tried: |
|
I'm OOTL. Where did the motivation for these improvements come from? CI failures? |
https://redpandadata.atlassian.net/browse/CORE-15445
An update to group offsets may be lost due to leadership transfers.
We previously tolerated it: partitions are typically written into and compacted, so coordination offsets often move and a new update will get through.
However, it is difficult to test this way. Fix the functionality and the test.
Backports Required
Release Notes